NOTE : for better user experience, this notebook uses the  Python Markdown  Jupyter Notebook extension (@see the corresponding ReadTheDoc page and the associated GitHub repo for details). Please make sure it is installed and activated on your environment and that the current notebook is set as "Trusted" for it to work.
In [1]:
import os, sys, cv2
import numpy as np
import pandas as pd
pd.plotting.register_matplotlib_converters() # datetime convert
import datetime, json

import dill as pickle
pickle_root = os.path.realpath(os.path.join('.', '_archive'))
if not os.path.exists(pickle_root): os.makedirs(pickle_root)
model_root = os.path.realpath(os.path.join('.', 'model'))
if not os.path.exists(model_root): os.makedirs(model_root)

from pprint import pprint
from IPython.core.display import display, HTML

import scipy ; print("scipy v." + str(scipy.__version__))
from scipy.fft import rfft, rfftfreq # FastFourrierTransform on Real input
import scipy.stats as stats

import matplotlib.pyplot as plt
plt.rcParams.update({'font.size': 12})
%matplotlib inline

%reload_ext autoreload
%autoreload 2

from my_TS_Anomaly_lib.utils import millify, strfdelta, print_timedelta

import pymongo ; print('{:20s}'.format("pymongo v." + str(pymongo.version)), end='-  ')

import findspark ; findspark.init()
import pyspark ; print("pyspark v." + str(pyspark.version.__version__))

import tensorflow ; print('{:20s}'.format("tensorflow v." + str(tensorflow.version.VERSION)), end='-  ')
import tensorboard ; print("tensorboard v." + str(tensorboard.__version__.__str__()))
scipy v.1.5.2
pymongo v.3.11.0    -  pyspark v.2.4.4
tensorflow v.2.3.0  -  tensorboard v.2.4.0

Bearings Vibration Anomaly Detection

Anomalie detection is a very hot topic and is key to many domains. Would it be industrial or consumer-related fields, automation is everywhere. Just a couple examples of where Anomaly Detection has become unavoidable :
  • frauds such as on bank transaction
  • fake orders placement (for instance to disrupt a market logistics and infrastructure)
  • stock trading robots (unusual drop or surge on a share price)
Anywhere an alert mechanism needs to be placed on a complex system (where contextual information influences the observed variable, for instance), Deep Learning Recurrent Neural Networks can be successfully deployed. We chose to illustrate Anomaly Detection by applying it to an equipment failure watchdog use case. We here developp a semi-supervised anomaly detection solution, i.e. we construct a model representing normal behavior from a given normal training data set, and then test the likelihood of a test instance to be generated by the learnt model. If this likelihood is beyond a certain threshold, then the signal is determined to be of abnormal nature.

1. The Dataset - Bearings Vibration Sensory Time Series (go back to top ↑)]

The "Bearing Data Set" is part of the records provided by NASA in its Prognostics Data Repository . This NASA data repository gathers a collection of datasets that have been donated by various universities, agencies, or companies.

It was originally used for the Feb 2006 research paper by J. Lee, H. Qiu, G. Yu, J. Lin, and Rexnord Technical Services, for the Intelligent Maintenance System center - University of Cincinnati, entitled Wavelet filter-based weak signature detection method and its application on rolling element bearing prognostics .
A Spherical Roller Bearing (sectional view)
spherical Roller Bearing
Bearings test rig and sensor placement illustration
Bearings test rig
click to enlarge
The data comprises measurements from vibrations sensors respectively mounted on 4 distinct spherical roller bearings. These bearings were placed along a rotating shaft as illustrated on the bottom-left figure.
The experiment was of a 'test-to-failure' nature and has been reproduced 3 times. All bearing failures occurred after exceeding their designed life time, which was of more than 100 million revolutions.

REMARK : for 'test 1', 8 sensors were used (2 per bearing), when for both 'test 2' and 'test 3' only 4 sensors were deemed sufficient to be mounted on the test bench (1 per bearing).


2. Download Sensors Data to MongoDB (go back to top ↑)]

Mongo DB ! In this companion notebook , we automate the download of the data from the NASA repository and load it into a Mongo DB database. The source data is provided as a compressed set of very numerous files, each consisting in tens of thousands time-slices of 1 second sensors recordings at a sampling rate of 20 kHz. We use  multiprocessing  to speedup the data dumps to Mongo DB.

For our read needs on the newly created nasa_bearing_ims_db Mongo DB database, a user named 'readOnlyUser' is available to us :
In [2]:
from pymongo import MongoClient

username = 'readOnlyUser' ; password = 'readOnlyUser_password'

client = MongoClient('mongodb://%s:%s@127.0.0.1' % (username, password)
                     , appname='Abnormal Vibration Watchdog')
We can for instance look into 2 measurement timestep documents from test #2 :
In [3]:
cursor = client.nasa_ims_database.measurements.find(
    {'test_id': test_id}, limit=2)
for document in cursor : pprint(document)
{'_id': ObjectId('5fa6cc51fa33d3567b40248a'),
 'sensor_1': -0.049,
 'sensor_2': -0.071,
 'sensor_3': -0.132,
 'sensor_4': -0.01,
 'step': 0,
 'test_id': 2,
 'timestamp': datetime.datetime(2004, 2, 12, 10, 32, 39)}
{'_id': ObjectId('5fa6cc51fa33d3567b40248b'),
 'sensor_1': -0.042,
 'sensor_2': -0.073,
 'sensor_3': -0.007,
 'sensor_4': -0.105,
 'step': 1,
 'test_id': 2,
 'timestamp': datetime.datetime(2004, 2, 12, 10, 32, 39)}

REMARK : Once the data downloaded, we can follow this link to the README file that comes alongside it : . This README provides a little further details on the experiment run to produce the dataset than introduced §1..

From there, it is very easy to look into sensory measurements more broadly. It is as straightforward as it sounds, i.e. the reported measures account for high and low position displacements, corresponding to oscillations around an idle position (unit unknown). Below we collect data from 6 measurements on sensor_1 (still from test #2) :
In [4]:
from my_TS_Anomaly_lib.nasa_bearing_ims_mongo import get_test_first_timestamps

data = get_test_first_timestamps(
    mongo_client = client
    , database_name = 'nasa_ims_database'
    , test_id = test_id
    , timestamps_count = timestamps_count
    , sensor_name = sensor_name
)
timestamps = pd.DatetimeIndex(data['timestamp'].unique()).sort_values(ascending = True)
completed in 00:00:50
For test #2, for instance, the 1-second-long measurements are performed every 10m 00s. Here are what their respective plots look like :
In [5]:
fig, ax = plt.subplots(len(timestamps), 1, figsize=(15, 6), sharey = 'all')
fig.subplots_adjust(hspace = .5, wspace=.001)
ax = ax.ravel()

for i, timestamp in enumerate(timestamps) :
    data[data['timestamp'] == timestamp] \
        .plot('long_datetime', [sensor_name], ax = ax[i])
    ax[i].get_legend().remove() ; ax[i].set_xlabel(None)
    ax[i].set_xticks(ax[i].get_xticks()) # otherwize 'FixedFormatter' warning
    ax[i].set_xticklabels([timestamp]*len(ax[i].get_xticklabels())
                          , rotation=0, ha='center')

fig.suptitle(str(timestamps_count) + " first sensory measurements " +
             "on sensor '" + sensor_name + "' for test #" + str(test_id) + "\n" +
             "(one second each)", fontsize=16)
fig.tight_layout() ; fig.subplots_adjust(top=.9) ; plt.show()
Now, lets look into the spectre (set of active frequencies) of the vibrations reported on the 6 first measurements for a better view of the oscillations as reported by 'sensor_1' on test #2 :
In [6]:
measurement_steps_count = data[data['timestamp'] == timestamp].shape[0]
# sample frequencies of the measuring device (in Hertz)
# (in our case, each measurement lasts 1 second) =>
SAMPLE_RATE = measurement_steps_count

xf = rfftfreq(measurement_steps_count, 1 / SAMPLE_RATE)

fig_rows_count = 2 ; fig_columns_count = -round(-(len(timestamps) / fig_rows_count))
fig, ax = plt.subplots(fig_rows_count, fig_columns_count, figsize=(15, 6), sharey = 'all')
ax = ax.ravel() ; fig.subplots_adjust(hspace = .1, wspace=.0)

for i, timestamp in enumerate(timestamps) :
    yf = rfft(data[data['timestamp'] == timestamp][sensor_name].values)
    ax[i].plot(xf, abs(yf)) ; ax[i].margins(x=0, y=0)
    ax[i].set_xticks(ax[i].get_xticks()) # otherwize 'FixedFormatter' warning
    xticks = [millify(xtick, True)+'Hz' for xtick in ax[i].get_xticks()]
    xticks[0] = '0' if i%fig_columns_count == 0 else ''
    ax[i].set_xticklabels(xticks, rotation=0, ha='right')

fig.suptitle("Spectre of the vibration signal recorded by '" + sensor_name + "' " +
             "for the " + str(timestamps_count) + " first measurements " +
             "of test #" + str(test_id), fontsize=16)
fig.subplots_adjust(top=.93) ; plt.show()
As can be observed on the above figure, at the beginning of the experiment, when a bearing is new and in full working conditions, the recorded vibration signal is quite stable.

3. PySpark Connection (go back to top ↑)]

PySpark We will be interfacing our NASA IMS Measurements Mongo DB database collection thru the MONGODB SPARK CONNECTOR . This will allow for distributed access and pre-processing of our data.

Lets first establish a "Mongo DB connector aware" Spark Session for the Jupyter kernel instance running this notebook :
In [7]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from my_TS_Anomaly_lib.nasa_bearing_ims_spark import schema

# add the 'mongo-spark-connector' classpath info
# PRIOR to the JVM start on this notebook kernel
spark_jars_directory = os.path.realpath("C:/Users/Organization/.ivy2/jars/")
sys.path.insert(0, spark_jars_directory)
SUBMIT_ARGS = '--packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.2 --driver-memory 2G pyspark-shell'
# - where "2.11" is our Scala version
#   (on which our Spark 2.4.4 is running)
# - where 2.4.2 is the mongodb-spark connector version
#   (compatible with our Spark 2.4.4 version)
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

## Start Spark
master = "local[*]" ; app_name = "Abnormal_Vibration_Watchdog"
sc = SparkContext(master = master, appName = app_name)
spark = SparkSession(sc).builder.getOrCreate()

display(HTML("<em>To monitor jobs &amp; stages, go to the " +
             "<a href='" + spark.sparkContext.uiWebUrl + "/environment/'" +
             " target='_blank'>Spark UI</a> (on your Spark master host)</em>"))
To monitor jobs & stages, go to the Spark UI (on your Spark master host)
We can now set a reference to our Mongo DB collection thru a Spark DataFrame. We will keep focussing on test #2  :
In [8]:
database_name = 'nasa_ims_database'
pipeline = "[]" # "[{ $match: {test_id: " + str(test_id) + "} }]"

df = (
    spark.read
        .format("com.mongodb.spark.sql.DefaultSource")
        .option("uri", "mongodb://" + username + ":" + password + "@127.0.0.1:27017")
        .option("database", database_name)
        .option("collection", "measurements")
        .schema(schema) # skips the (slow) "records sampling / schema retrieval" internals
        .option("batchSize", 70_000)
        .option("pipeline", pipeline)
        .load()
)
As can be seen below, our Spark DataFrame displays the expected test measurements attributes :
In [9]:
df.printSchema()
root
 |-- _id: struct (nullable = false)
 |    |-- oid: string (nullable = false)
 |-- sensor_1: double (nullable = false)
 |-- sensor_2: double (nullable = false)
 |-- sensor_3: double (nullable = false)
 |-- sensor_4: double (nullable = false)
 |-- sensor_5: double (nullable = true)
 |-- sensor_6: double (nullable = true)
 |-- sensor_7: double (nullable = true)
 |-- sensor_8: double (nullable = true)
 |-- step: integer (nullable = false)
 |-- test_id: integer (nullable = false)
 |-- timestamp: timestamp (nullable = false)

3.1. Data Pipeline (go back to top ↑)]

In the herein chapter, we perform distributed data pre-processing. The data pipeline employed is made-up of the stages depicted on the below figure. We detail these stages further in the upcoming sections. We perform some additional data exploratory analysis along the way.
IMS BEARINGS SENSORY MEASUREMENTS - DATA PIPELINE

3.1.a. Aggregation (go back to top ↑)]

As mentioned §1., the dataset we're working with was initially used for bearing prognostics and weak [failure] signature detection. The original research paper considered the RootMeanSquare of the vibration amplitude. We will look into the average of absolute vibration amplitudes across all steps for each measurement. To operate this transformation, as the dataset is pretty voluminous, we turn towards the distributed computing capacities that Spark makes easily accessible :
In [10]:
from pyspark.sql import functions as F

start_time = datetime.datetime.now()

df_abs = df.withColumn('sensor_1', F.abs(df.sensor_1)) \
        .withColumn('sensor_2', F.abs(df.sensor_2)) \
        .withColumn('sensor_3', F.abs(df.sensor_3)) \
        .withColumn('sensor_4', F.abs(df.sensor_4)) \
        .withColumn('sensor_5', F.abs(df.sensor_5)) \
        .withColumn('sensor_6', F.abs(df.sensor_6)) \
        .withColumn('sensor_7', F.abs(df.sensor_7)) \
        .withColumn('sensor_8', F.abs(df.sensor_8))
df_aggregated = df_abs.groupBy(["test_id", "timestamp"]).agg(
    F.mean('sensor_1').alias('sensor_1')
    , F.mean('sensor_2').alias('sensor_2')
    , F.mean('sensor_3').alias('sensor_3')
    , F.mean('sensor_4').alias('sensor_4')
    , F.mean('sensor_5').alias('sensor_5')
    , F.mean('sensor_6').alias('sensor_6')
    , F.mean('sensor_7').alias('sensor_7')
    , F.mean('sensor_8').alias('sensor_8')
)
silent = df_aggregated.cache()
print_timedelta(start_time)
completed in 00:00:00
Lets now launch the data transformation job and retrieve the (aggregated) result in memory. Note that we process this intermediary step here only to be able to explore the result of this pre-processing stage. Collecting the data at this stage would otherwise serve no purpose and be unnecessary.

REMARK : We notify Spark to mark this intermediary dataframe for caching, as it is an intermediary part to our larger data transformation pipeline. That way, our Spark cluster will not need to re-process that stage when we'll later resume our pre-processing data pipelining.

In [11]:
start_time = datetime.datetime.now()
records = df_aggregated.rdd.collect() # initiate Spark computation
print_timedelta(start_time)
completed in 00:23:07
In [12]:
data = pd.DataFrame(
        records
        , columns = records[0].__fields__
    ).sort_values(by=['test_id', 'timestamp']).reset_index(drop=True)
Here are the respective start date of the 3 tests :
In [13]:
test_start = data.groupby('test_id')['timestamp'].min()
print(test_start.to_string())
test_id
1   2003-10-22 14:06:24
2   2004-02-12 11:32:39
3   2004-03-04 10:27:46
For enhanced clarity, lets add a 'relative time from t0' column which we'll name 'relative_timestamp' and plot the different timelines  :
In [14]:
relative_timestamps = []
for tuple_row in data.itertuples() :
    test_id = tuple_row[1]
    relative_timestamps.append(tuple_row[2] - test_start[test_id])
    #print(relative_timestamps[-1])
data['relative_timestamp'] = relative_timestamps
In [15]:
from my_TS_Anomaly_lib.nasa_bearing_ims_spark import test_plot

fig, ax = plt.subplots(nrows = 1, ncols = 3, sharex='all', sharey='all', figsize=(12, 6), dpi=80)
test_plot(data, test_id = 1, ax = ax[0])
test_plot(data, test_id = 2, ax = ax[1])
test_plot(data, test_id = 3, ax = ax[2])
ax[0].legend() ; ax[0].set_ylabel('Amplitude (one-second average absolute)')
fig.suptitle('Bearing Sensor Data', fontsize=16)
fig.tight_layout() ; fig.subplots_adjust(top=.94) ; plt.show()
As we can see of the above plot, test #1 is missing several large chunks of data (portions showing as straight lines over the time axis on the figure's left-most quadrant) and, test #2 only contains data pertaining to the last few days of the experiment. We will therefore put our focus on test #3 only to train our model.

REMARKS  :
  • test #3 also has missing data for some measurements (at most for a few hours), but very much less than for test #1 (for way more than just a few days). It can however not be percieved on the figure due to scaling.
  • still on test #3, the spikes observed on 'sensor_2' signal, when analysed closely, have proved to be signals very similar to the surrounding measurements on that sensor (just before and just after) with very comparable spectres, with the exception of a "constant / 0Hz / offset" component of unknown nature. Maybe is it due to the 'sensor_2' hardware itself which may start to show early signs of imminent failure. But this is pure speculation, we couldn't uncover the real reason with just the information at hand. It is however interesting to notice that other sensors for that experiment are not concerned by that phenomenon.
You can find details on those 2 remarks on test #3 in this dedicated short notebook .

3.1.b. Train split and scaling (go back to top ↑)]

To train our Autoencoder, we need to isolate an input dataset consisting in measurements made in sain (non-faulty) conditions. We will thus exclude the last 4 days of measurements from the training process. Recall that we consider test #3. And, as we wish to train a model relevant for a wide variety of sensor positions, we treat the input data from all the 4 sensors.
We employ Spark SQL for this phase of the data transformation. For shortness of code, the my_TS_Anomaly_lib.nasa_bearing_ims_spark.get_sensors_capped_sql_statement function generating the necessary statement has been deported (and can be explored here ). :
In [16]:
from my_TS_Anomaly_lib.nasa_bearing_ims_spark import get_sensors_capped_sql_statement

df_aggregated.filter(df_aggregated['test_id'] == train_test_id).createOrReplaceTempView("test_"+str(train_test_id)+"_agg_tab")
sql_statement = get_sensors_capped_sql_statement(spark, train_test_id, save_last_days_count)
#print(sql_statement)

query_resultset = spark.sql(sql_statement)
capped_df = \
    query_resultset.toDF('timestamp', 'sensor_1', 'sensor_2', 'sensor_3', 'sensor_4')

silent = capped_df.cache()
#concatenated_df.explain() ; #print(str(concatenated_df))
Now that we have data from "sain" working conditions at hand, lets consider scaling it. In Machine Learning, there are 2 usual reasons for scaling input data :
  • it either involves "normalizing" all input features so that no one dominates over any other when computing distance between observations is involved (PCA feature decomposition or K-nearest neighbors calculation for instance). We, here, have only 1 input feature (the vibration signal), so it does not apply to us (and we do not measure distance between observations).
  • or often it is applied to accelerate algorithms convergence when they involve gradient descent. In our present case, the original scale is already in the sweet zone, meaning in the vincinity of the [-1 - 1] area (it is actually in a sub-window, which is [0 - 0.1] as we have observed earlier). So, it doesn't apply to us either.
In our case, we use MinMaxScaler so as to expand the range of the values of our single input feature from the current [0 - 0.1] range up to the [0 - 1] range, in hope that it helps our model learn and get better sensitivity to variations.
In [17]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline

columns_to_scale = ['sensor_1', 'sensor_2', 'sensor_3', 'sensor_4']
assembler = VectorAssembler(inputCols=columns_to_scale, outputCol="features", handleInvalid="keep")
scaler = MinMaxScaler(inputCol='features', outputCol='scaledFeatures')
pipeline = Pipeline(stages = [assembler] + [scaler])

# initiate Spark computation (eager transformation)
start_time = datetime.datetime.now()
scalerModel = pipeline.fit(capped_df) # fitting the scalers
print_timedelta(start_time)

originalMins = scalerModel.stages[1].originalMin ; originalMaxs = scalerModel.stages[1].originalMax
#print("MinMaxScaler (sensor_1) : " + str(originalMins[0]) + " ; " + str(originalMaxs[0]))
completed in 00:02:50
We save the scaling functions for later use, when we'll generate predictions on the fly (during the testing of our models or in production performing live sensor monitoring) :
In [18]:
def create_scaler_func(
    originalMin
    , originalMax
) :
    def sensor_abs_mean_measurements_scaler(Xreal) :
        return (Xreal - originalMin) / ( originalMax - originalMin )
    return sensor_abs_mean_measurements_scaler
#create_scaler_func(originalMins[0], originalMaxs[0])([.092, .025]) # scaler to be used on sensor 1 ".abs().avg()" data
In [19]:
for i in range(4) :
    with open(os.path.join(model_root, 'sensor_'+str(i+1)+'_scaler.pickle'), 'wb') as file_pi:
        pickle.dump(create_scaler_func(originalMins[i], originalMaxs[i]), file_pi, protocol=3)

3.2. Transformed Data (go back to top ↑)]

Now that we have most of the data transformation workflow in place, including our fitted scalers, remains to collect the resulting data and "de-vectorize" it :
In [20]:
from my_TS_Anomaly_lib.nasa_bearing_ims_spark import vector_row_element

# splitting the scaledFeatures vector (of 4-elements rows)
df_scaled = \
    scalerModel.transform(capped_df).select(
    ["timestamp"] +
    ["sensor_"+str(i+1) for i in range(4)] +
    [
        vector_row_element("scaledFeatures", F.lit(i)).alias("scaled_sensor_"+str(i+1))
        for i in range(4)
    ]
)
silent = df_scaled.cache()

# initiate Spark computation
start_time = datetime.datetime.now()
scaled_records = df_scaled.rdd.collect()
print_timedelta(start_time)
completed in 00:01:28
In [21]:
scaled_data = pd.DataFrame(
    scaled_records
    , columns = scaled_records[0].__fields__
).reset_index(drop=True)
Below, we can visualize our transformed training data. The output of our data pipeline is made-up of the measurement average absolute values from all the sensors, each rescaled to respectively occupy the entire [0-1] range :
In [22]:
fig, ax = plt.subplots(figsize = (16, 3))
plt.title('Scaled Bearings Vibration Displacements\n' +
          '(absolute mean over 1-second long measurements)'
          , fontsize=16)
scaled_data.plot(x = 'timestamp', y = ['scaled_sensor_1', 'scaled_sensor_2'
                                       , 'scaled_sensor_3', 'scaled_sensor_4']
                 , alpha = 0.4, ax = ax)
plt.legend(ncol=2) ; plt.show()
#ax.set_ylabel('Vibration Amplitude\n(one-second average absolute - SCALED)')

Since we're done with it, we can now clean the Spark cache from the objects we had marked as persistent along the way =>

In [23]:
silent = spark.catalog.dropTempView("test_"+str(train_test_id)+"_agg_tab")
silent = df_aggregated.unpersist()
silent = capped_df.unpersist()
silent = df_scaled.unpersist()
In [24]:
spark.stop()


backup :

In [25]:
with open(os.path.join(pickle_root, 'records.pickle'), 'wb') as file_pi:
    pickle.dump(records, file_pi)
In [26]:
with open(os.path.join(pickle_root, 'scaled_records.pickle'), 'wb') as file_pi:
    pickle.dump(scaled_records, file_pi)

reload :

In [27]:
with open(os.path.join(pickle_root, 'records.pickle'), 'rb') as file_pi:
    records = pickle.load(file_pi)
In [28]:
with open(os.path.join(pickle_root, 'scaled_records.pickle'), 'rb') as file_pi:
    scaled_records = pickle.load(file_pi)

4. Keras LSTM Autoencoder (go back to top ↑)]

The Encoder-Decoder model architecture is widespread across a range of AI applications. The purpose of the Encoder first half consists in extracting high level features from an input and encoding those into a vector (an embedding) of lowerized dimension compared to the input itself. The Decoder second half of such models consists in generating an outcome from this set of encoded high level characteristics. Very often, they are used in supervised learning contexts.
Autoencoders form a sub-family of Encoder-Decoder models. Their particullarity is that they are utilized in a semi-supervised manner. In laymans term :
  • they are trained in a supervised fashion using the input as the target output (they are train to "predict" the input. They are fed "A" and told that the outcome they should generate is "A" itself). The goal is to minimize the reconstruction error.
  • they are afterwards utilized to "predict" the outcome when new records are fed to them. We look at the "error" between input and output and, if the error is higher than the error observed during training, this new input is considered to be of abnormal nature.
Our training input signal is 5,790 timestamps long. We'll feed it to a sequence deep learning model. For that purpose, we generate a training input dataset of sequence slices of 30 timestamps duration each (moving time window, using a stride of 1). Lets build such a dataset to train a model on sensor_1 data :
In [29]:
def create_sequences(X, y, time_steps=TIME_STEPS):
    Xs, ys = [], []
    for i in range(len(X)-time_steps):
        Xs.append(X.iloc[i:(i+time_steps)].values)
        ys.append(y.iloc[i:(i+time_steps)].values)

    return np.array(Xs), np.array(ys)

X_train, y_train = create_sequences(scaled_data[['scaled_'+sensor_name]], scaled_data[['scaled_'+sensor_name]])
assert ~(X_train != y_train).any(), 'the input sequencing method didn''t provide outputs identical to inputs'
print(f'Training input shape: {X_train.shape}')
Training input shape: (5760, 30, 1)
The implication of this design choice is that for our model to detect a potential anomaly at timestamp "t", we'll feed it a [t-30 to t] measurements sequence, to give it some context when asked for a prediction. To better be able to identify early warning signs of a bearing deteriorating working conditions, our model will not work based on an isolated measurement alone.

4.1. Model Architecture (go back to top ↑)]

As contemplated in the previous section, we will be working with an Encoder-Decoder Recurrent Neural Network architecture. Both the Encoder and the Decoder will consist in stacked LSTM layers. We will optimize the architecture against how deep they shall be as well as against how wide each of their layers shall be.

A brief refreshner on LSTM layers : In the Keras world, "lstm_units" is the dimension of the output vector from an LSTM layer. The layer will contain multiple parallel LSTM units, "structurally" identical but each eventually "learning to remember" different things from the input sequence. If "return_sequences=True", each of those dimensions represents a sequence of same length as the input sequence.
Lets see how it goes for the first lstm layer of our model. In our case, we have only the vibration signal as input, so it is a single-feature input. So, for each training record, we have a "time_steps x 1" matrix as input, which gives a "time_steps x lstm_units" matrix output. The LSTM layer can be viewed as a feature extractor with "lstm_units" features, all with time-distributed values.
If you'd like to read other wordings for this, you may refer to these two forum threads : Quora , StackExchange .

The general architecture of our model will be as follows :
Where :
  • both the Encoder body and the Decoder body will have the same number "num_layers" of LSTM layers
  • each LSTM layer in the Encoder body will have double the amount of lstm_units of its successor and
  • every LSTM layer in the Decoder body will have double the amount of lstm_units of its predecessor

You can view how it has been implemented in the my_TS_Anomaly_lib.model.autoencoder_model model building function there : . If you're impatient to see what the resulting tuned model looks like, you can skip the optimization sections and directly jump to §4.3. Best Tuned Model.

REMARK : the output of the "encoder_lstm_tail" layer from the above figure is the "encoding" of the input signal. We feed our Encoder a record of shape "time_steps x features_count" and get a (flat) vector of length "lstm_units". Our Encoder does "encode" a time sequence into a (flat) 1D vector. This Encoding is often used as a dimension-reduced representation of a time sequence. It is indeed demonstrably a "signature" to the time-sequence, since even our Decoder here can reconstruct the input time-sequence using only the encoding as input.

4.2. Hyperparameters tuning (go back to top ↑)]

Lets try and optimize our model architecture against signal reconstruction loss.

4.2.a. Bayesian Optimization (go back to top ↑)]

We relate to the usage of a  Keras Tuner , and more particularly its implementation of the Bayesian Optimization algorithm. It is aimed at efficiently converging towards an optimum set of hyperparameters values. We allow our tuner 12 attemps at optimizing the hyperparameters for our model. They are called 'trials'. Since there is some variability inherent to the training process, each set of hyperparameters will be trained 3 times. Those are called 'executions'.
In [30]:
from my_TS_Anomaly_lib.bayesian_optimization import KerasTunerBayesianOptimization
from my_TS_Anomaly_lib.model import autoencoder_model

tuner_root =  os.path.join('.', 'tuner', sensor_name)
timestamp = datetime.datetime.now().strftime("%m%d%H%M") # not too long, or tensorboard fails (dir path)
print("Session timestamp : " + str(timestamp))
tuner = KerasTunerBayesianOptimization(
    autoencoder_model(time_steps = X_train.shape[1], features_count = X_train.shape[2])
    , objective = 'val_loss'
    , max_trials = max_trials
    , executions_per_trial = executions_per_trial
    , directory = tuner_root
    , project_name = timestamp
    , overwrite = True)
Session timestamp : 12021308
Here are the properties of the hyperparameters domain we set thru the 'autoencoder_model' model-building method for the tuner to search :
In [31]:
tuner.search_space_summary()
Search space summary
Default search space size: 4
num_layers (Int)
{'default': None, 'conditions': [], 'min_value': 0, 'max_value': 3, 'step': 1, 'sampling': None}
lstm_units (Int)
{'default': None, 'conditions': [], 'min_value': 4, 'max_value': 290, 'step': 1, 'sampling': 'log'}
dropout_rate (Float)
{'default': 0.01, 'conditions': [], 'min_value': 0.01, 'max_value': 0.2, 'step': None, 'sampling': 'linear'}
learning_rate (Choice)
{'default': 0.001, 'conditions': [], 'values': [0.001, 0.0005, 0.0001, 5e-05, 1e-05], 'ordered': True}
We monitor the tuning process via a tailored TensorBoard callback. In addition to what TensorBoard does when used conjointly with Kears-Tuner, our implementation tracks learning rate decay and measures training times.
In [32]:
from my_TS_Anomaly_lib.jupyter_tensorboard_windows_launcher import JupyterTensorboardWindows

tb_launcher = JupyterTensorboardWindows( os.path.join('.', 'tuner') )
tb_launcher.run()

#On Windows, also view TensorBoard thru "http://localhost:6006/"
%reload_ext tensorboard
# embed a Tensorboard HTML page in an iframe in the below jupyter notebook cell (will only show when running)
%tensorboard --logdir {os.path.join('.', 'tuner').replace("\\", "\\\\")}
webfiles.zip static assets not found: D:\jupyter_notebooks\TimeSeries_Anomaly_Detection\my_TS_Anomaly_lib\webfiles.zip
TensorBoard 2.4.0 at http://localhost:6006/ [ http://localhost:6006 ]
Reusing TensorBoard on port 6006 (pid 4888), started 0:03:14 ago. (Use '!kill 4888' to kill it.)
In [33]:
from tensorflow.keras.callbacks import ReduceLROnPlateau

learning_rate_reduction = ReduceLROnPlateau(monitor='loss' # purposely not 'val_loss', here
                                            , patience=10, factor=0.1
                                            , verbose=1)
In [34]:
from my_TS_Anomaly_lib.bayesian_optimization import TailoredTensorBoard

# BEWARE ! TensorBoard over GPU now requires https://github.com/tensorflow/tensorflow/issues/35860
tensorboard_root = os.path.join(tuner_root, timestamp, 'tb')
os.makedirs(tensorboard_root, exist_ok=True)
tensorboard_callback = TailoredTensorBoard(tensorboard_root)
We are now set to initiate the actual Bayesian Optimization proceedings :
In [35]:
silent = os.system('cls') # clear terminal

batch_size = 256
nb_epochs = 100

tuner.search(X_train, X_train
             , epochs=nb_epochs, batch_size=batch_size
             , validation_split=0.15
             , callbacks=[learning_rate_reduction, tensorboard_callback]
            )
Trial 12 Complete [00h 41m 16s]
val_loss: 0.10103376458088557

Best val_loss So Far: 0.00439801982914408
Total elapsed time: 12h 53m 13s

4.2.b. Results (go back to top ↑)]

Now that our tuner has accomplished its duty, we can retrieve and plot its history to visualize the evolution of each of the trained models performances during training :
In [36]:
from my_TS_Anomaly_lib.bayesian_optimization \
    import tuner_tensorboard_to_history_df, plot_tuner_models_histories

history_df = tuner_tensorboard_to_history_df(tensorboard_root, verbose = 0)

plot_tuner_models_histories(history_df)
We can even better observe on the below figure that the optimization algorithm at first tries hyperparameter values without much enhancement in the model performance. It does however reach fairly quickly (after just a couple attemps) an inflection point where the performance of the tuned model gets better and better, to a point when it can't be further optimized within the hyperparameters domain we imposed.
We can also see clearly that not all sets of hyperparameter values are equal in terms of training time. Some sets of values do indeed translate into models that are much longuer to train than others. Depending on what matters the most to the end user, model reconstruction loss (sensitivity of the anomalie detection) or prediction speed (which is directly correlated to the training time), we could choose our "best" set of hyperparameter values differently.
In [37]:
from my_TS_Anomaly_lib.bayesian_optimization import plot_tuner_models_performances

plot_tuner_models_performances(history_df)
For information, the set of hyperparameter values associated with the different tuner trials are as follows :
In [38]:
from my_TS_Anomaly_lib.bayesian_optimization import tuner_trials_hparams_to_df
from my_TS_Anomaly_lib.utils import df_highlight_row

trials_hp = tuner_trials_hparams_to_df(tuner_project_dir = os.path.join(tensorboard_root, '..'))
df_highlight_row(df = trials_hp.drop(['trial_id'], axis=1)
                 , row_index = trials_hp['val_loss (best)'].idxmin())
Out[38]:
dropout_rate learning_rate lstm_units num_layers val_loss (best)
trial
0 0.169098 0.000010 32 1 0.039288
1 0.167880 0.000100 270 2 0.005585
2 0.142170 0.000100 205 2 0.005592
3 0.139651 0.001000 290 3 0.004500
4 0.010000 0.001000 290 2 0.004398
5 0.010000 0.001000 8 3 0.005028
6 0.010000 0.000050 290 3 0.004917
7 0.010000 0.001000 4 0 0.005346
8 0.077242 0.001000 290 0 0.004974
9 0.177316 0.001000 4 3 0.018344
10 0.200000 0.001000 290 1 0.005219
11 0.010000 0.000010 4 3 0.101034
When comparing the above table with the previous figure, we can for instance notice that, logically, the less shallow the model (the more 'num_layers'), the longer it takes to get trained.


backup :

In [39]:
with open(os.path.join(model_root, sensor_name+'_trained_tuner.pickle'), 'wb') as file_pi:
    pickle.dump(tuner, file_pi)
In [40]:
with open(os.path.join(model_root, sensor_name+'_hyperparameter.json')
          , 'w', encoding='utf-8') as json_file :
    json.dump(
        tuner.get_best_hyperparameters()[0].__dict__['values']
        , json_file, ensure_ascii=False, indent=4)
In [41]:
tuner.get_best_models()[0].save_weights(os.path.join(model_root, sensor_name+'_weights.hdf5'))

reload :

In [42]:
TIME_STEPS = 30 ; train_test_id = 3 ; sensor_name = 'sensor_1' ### <<<=== DEBUG - IGNORE THIS WHOLE CELL !!!!
max_trials = 12 ; executions_per_trial = 3
tuner_root =  os.path.join('.', 'tuner', sensor_name)
timestamp = '12021315' ; tensorboard_root = os.path.join(tuner_root, timestamp, 'tb')
save_last_days_count = 4
In [43]:
with open(os.path.join(model_root, sensor_name+'_trained_tuner.pickle'), 'rb') as file_pi:
    tuner = pickle.load(file_pi)
In [44]:
# override "best_model" (if ever useful)
with open(os.path.join(model_root, sensor_name+'_hyperparameter.json')) as json_file:
    best_hyperparameters_dict = json.load(json_file)
best_model = rebuild_model(best_hyperparameters_dict) # compiled but untrained
#best_model.summary()
best_model.load_weights(os.path.join(model_root, sensor_name+'_weights.hdf5'))


4.3. Best Tuned Model (go back to top ↑)]

In [45]:
best_model = tuner.get_best_models()[0]
best_hps = tuner.get_best_hyperparameters()[0]
#best_model.summary()
In [46]:
from my_TS_Anomaly_lib.model import plot_autoencoder

architecture_plot = plot_autoencoder(best_model)
silent = cv2.imwrite(os.path.join('.', 'images', 'architecture.png'), architecture_plot)
Time Series / Anomaly Detection
click to enlarge
As previously stated, we allowed our Tuner to search the following hyperparameters space :
['   - Int(name: "num_layers", min_value: 0, max_value: 3, step: 1, sampling: None, default: 0)
', '   - Int(name: "lstm_units", min_value: 4, max_value: 290, step: 1, sampling: log, default: 4)
', '   - Float(name: "dropout_rate", min_value: 0.01, max_value: 0.2, step: None, sampling: linear, default: 0.01)
', '   - Choice(name: "learning_rate", values: [0.001, 0.0005, 0.0001, 5e-05, 1e-05], ordered: True, default: 0.001)
'] As alluded to §4.2.b. Results, the fined-tuned "best" model identified by the Bayesian Optimization method has the architecture represented on the figure on the left, besides from the optimal training parameters 'learning_rate=0.001' and 'dropout_rate=0.01'. The Encoder body and the Decoder body both have 'num_layers=2' LSTM layers and both the 'encoder_lstm_tail' and 'decoder_lstm_head' layers are made up of 'lstm_units=290'.
Our LSTM Autoencoder being trained, we can measure how well it performs at Encoding/Decoding the transformed vibration sensor of a bearing in sain working conditions. To do that, we measure the signal reconstruction loss for all timestamps of the training dataset :
In [47]:
# plot the loss distribution of the training set

X_pred = best_model.predict(X_train)

# compute our model signal reconstruction loss (MeanAbsoluteError)
scored_train = pd.DataFrame()
scored_train['Loss_mae'] = np.mean(np.abs(X_pred-X_train), axis = 1).flatten()
threshold = scored_train['Loss_mae'].max()

plt.figure(figsize = (15, 3))
plt.title('Distribution of the Signal Reconstruction Loss\n' +
          '(training dataset, ' + sensor_name + ', normal bearing working conditions)'
          , fontsize=14)
n, x, _ = plt.hist(scored_train['Loss_mae'], bins = 200, density=True
                   , color = 'purple', alpha=0.4)
density_fct = stats.gaussian_kde(scored_train['Loss_mae'])
plt.plot(x, density_fct(x), color = 'purple')
plt.xlabel('Mean Absolute Error') ; plt.ylabel('Timestamps (density)')
#plt.xlim([0,.003]) ; 
plt.axvline(x=threshold, color='orange')
ymin, ymax = plt.ylim()
plt.annotate('Anomaly Loss Threshold', xy=(threshold, ymax), xytext=(-10, -15)
             , textcoords='offset points', rotation=90, va='top', ha='center'
             , annotation_clip=False, color='orange')
plt.show()
In [48]:
loss_distrib = scored_train.describe(percentiles=[.25, .5, .75, .98, .9995, .1]).T
display(loss_distrib)
count mean std min 10% 25% 50% 75% 98% 99.95% max
Loss_mae 5760.0 0.045085 0.008072 0.02377 0.035009 0.039374 0.044505 0.050452 0.062836 0.07364 0.074797
That way, we defined the threshold value above which the reconstruction loss 'mae ≥ 0.0748' is set to characterize an anomaly.

5. Anomaly Detection - Hard Failure Early Warning (go back to top ↑)]

Lets now challenge our semi-supervised RNN model and see how it performs when confronted with vibration sensor data of a bearing that ends-up dying of hard failure.
In [49]:
# retrieve time series after the "saved last days" date
# (from the source dataset kept out of the training dataset)
test_start_datetime = \
    data['timestamp'][data['test_id'] == train_test_id] \
        .max() + pd.Timedelta(days=-save_last_days_count)
test_data = \
    data[(data['test_id'] == train_test_id)
         & (data['timestamp'] >= test_start_datetime)
        ][['timestamp', sensor_name]].reset_index(drop=True)
In [50]:
# apply scaling
with open(os.path.join(model_root, sensor_name + '_scaler.pickle'), 'rb') as file_pi:
    sensor_scaler_fct = pickle.load(file_pi)

X_test = pd.DataFrame(sensor_scaler_fct(
    list(test_data[sensor_name].values) + ([0]*TIME_STEPS)
))
In [51]:
# create the input sequences
X_test, y_test = create_sequences(X_test, X_test)

print(f'Testing shape: {X_test.shape}')
Testing shape: (534, 30, 1)
At this stage, we have data time series for 534 succesive timestamps, each being a sequence of length 30. Those are scaled mean absolute displacement records.

We feed them to our predictor. In production, each of these 30-timestamps long sequences would be fed one at a time, when they become available. We, here, can see the historical trace of an actual failure.
In [52]:
# calculate the loss on the test set
# for each of the test timestamps
# (we do it here in one batch)
X_pred = best_model.predict(X_test)

# calculating the signal reconstruction loss timeline
scored_test = pd.DataFrame()
scored_test['Loss_mae'] = np.mean(np.abs(X_pred-X_test), axis = 1).flatten()
scored_test['Threshold'] = threshold
scored_test['Anomaly'] = (scored_test['Loss_mae'] > scored_test['Threshold']).shift(TIME_STEPS)
scored_test.loc[:TIME_STEPS, 'Anomaly'] = False # moving window "past before start"

print("Detected " + str(scored_test['Anomaly'].sum()) +
      " abnormal bearing vibration timestamps in total.")
Detected 158 abnormal bearing vibration timestamps in total.
Recall that we feed our model with signals that are 30-timestamps-long. Our model detects an anomalie as soon as it enters the 30-timestamps-long window (at the end of that time window), for as long as it remains within that window, and until it finally leaves that window (at the very beginning of the moving 30-timestamps-long window), if ever. The apparent 'shift' in anomaly detection on the below plot thus denotes no lateness in the detection mechanism, only the fact that each time it detects an anomaly, our model is being presented with a time window of 30 past datapoints.
In [53]:
from my_TS_Anomaly_lib.model import plot_predictions

plot_predictions(test_data, scored_test)
What the above chart shows on the bottom quadrant is that anomalies are detected, not when there is an increase in the input signal (even possibly quite sudden), but when the signal fails after a short while to recover the level it previously was holding.
In [54]:
first_anomaly_timestamp = test_data.loc[scored_test[scored_test['Anomaly']].index[0], 'timestamp']
last_timestamp = pd.to_datetime(test_data[['timestamp']].iloc[-1].values[0], unit='D')
The test ends on 2004-04-18 04:42 AM and our predictor detects the first signs of abnormal vibrations on sensor_1 on 2004-04-15 05:52 PM. Thanks to our Anomaly Detection mechanism, we has early warnings of imminent mechanical failure on sensor_1 as early as 2 days 10:50:00 in advance.
In [ ]:
 

Export Notebook to HTML (with Markdown extension cells evaluated and hidden cell outputs omitted) :

In [55]:
import os ; from my_TS_Anomaly_lib.utils import get_notebook_fullname
get_notebook_fullname()
Out[55]:
'D:\\jupyter_notebooks\\TimeSeries_Anomaly_Detection\\main.ipynb'

(to be launched from the conda env which holds the Jupyter installation, the one named 'r-tensorflow', in our personal case)


conda activate r-tensorflow python
import os ; os.getcwd() os.chdir('D:\jupyter_notebooks\TimeSeries_Anomaly_Detection') # to set working dirctory from my_TS_Anomaly_lib.jupyter_markdown_extension import md_extension_to_html md_extension_to_html('D:\jupyter_notebooks\TimeSeries_Anomaly_Detection\main.ipynb')
"Ctrl+D" (to exit python console mode)

In [ ]:
 

CLEAR

In [56]:
tensorflow.keras.backend.clear_session()
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]: